package com.hzmg.akka;

import akka.actor.*;
import akka.pattern.Patterns;
import akka.routing.*;
import com.google.common.collect.Lists;
import com.hzmg.common.exception.AkkaCloudException;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AkkaRouteDemo {
    static class Worker extends AbstractActor {
        private final String name;
        Worker(String name){
            this.name=name;
        }
        @Override
        public Receive createReceive() {
            return receiveBuilder().match(Work.class, message -> {
                //通过自己来当做标识
                getContext().getSystem().log().info("I am {},I accept {}",getSelf(),message.payload);
                getSender().tell("I am "+name+" and I receive your msg!",getSelf());
            }).build();
        }

    }

    static final class Work implements Serializable {
        private static final long serialVersionUID = 1L;
        public final String payload;

        public Work(String payload) {
            this.payload = payload;
        }
    }

    static class Master extends AbstractActor {
        Router router;

        {
            List<Routee> routees = new ArrayList<>();
            for (int i = 1; i <= 5; i++) {
                int finalI = i;
                ActorRef r = getContext().actorOf(Props.create(Worker.class,() ->new Worker("worker"+ finalI)),"worker"+finalI);
                getContext().watch(r);
                routees.add(new ActorRefRoutee(r));
            }
            //轮询路由逻辑
            router = new Router(new RoundRobinRoutingLogic(), routees);
            //随机路由逻辑
            //router=new Router(new RandomRoutingLogic(),routees);
            //SmallestMailboxRoutingLogic最小邮箱（空闲邮箱）路由逻辑
            //router=new Router(new SmallestMailboxRoutingLogic(),routees);
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(
                            Work.class,
                            message -> {
                                router.route(message, getSender());
                            })
                    .match(
                            Terminated.class,
                            message -> {
                                router = router.removeRoutee(message.actor());
                                ActorRef r = getContext().actorOf(Props.create(Worker.class));
                                getContext().watch(r);
                                router = router.addRoutee(new ActorRefRoutee(r));
                            })
                    .build();
        }
    }
public static void main(String[] args){
    //创建Actor工厂
    ActorSystem actorSystem = ActorSystem.create("akka-server");
    //ActorRef ref=actorSystem.actorOf(Props.create(Master.class), "master_actor");
    /*List<String> paths = Arrays.asList("akka://akka-server/user/master_actor/worker1", "akka://akka-server/user/master_actor/worker2", "akka://akka-server/user/master_actor/worker3","akka://akka-server/user/master_actor/worker4","akka://akka-server/user/master_actor/worker5");
    List<String> finalPaths= Lists.newArrayList();
    finalPaths.addAll(paths);
    ActorRef router4 = actorSystem.actorOf(new RoundRobinGroup(finalPaths).props(), "router4");
    actorSystem.actorOf(Props.create(Worker.class,"worker6"),"worker6");
    finalPaths.add("akka://akka-server/user/worker6");
    router4.tell(new AddRoutee(new ActorSelectionRoutee(actorSystem.actorSelection(finalPaths.get(5)))),router4);
    router4.tell(new AddRoutee(new ActorSelectionRoutee(actorSystem.actorSelection(finalPaths.get(1)))),router4);
    //删除旧路由，
    router4.tell(new Broadcast(Kill.getInstance()), router4);*/
    //测试池路由是否可行 最后说明是可行的
    ActorRef router=actorSystem.actorOf(new RoundRobinPool(5).props(Props.create(Worker.class,"test")),"test");
    ActorSelection routerSelection=actorSystem.actorSelection("user/test");
    for(int i=0;i<10;i++){
        //CompletionStage ask = Patterns.ask(router, new Work("hello:"+i), Duration.ofMillis(5000));
        //selection test
        CompletionStage ask = Patterns.ask(routerSelection, new Work("hello:"+i), Duration.ofMillis(5000));
        //.tell(new Work("hello:"+i),ref);
        try {
            String result= (String) ask.toCompletableFuture().get(5000, TimeUnit.MILLISECONDS);
            actorSystem.log().info(result);
        } catch (Exception e) {
            throw new AkkaCloudException(e);
        }
    }

}
}
